package com.gemini.main.reactor;


import io.netty.util.NettyRuntime;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * gemini
 * com.gemini.main.reactor.Reactor
 *
 * @author zhanghailin
 */
public class Reactor implements Runnable {

    public static void main(String[] args) throws IOException {
        Reactor reactor = new Reactor(8052);
        reactor.run();
    }

    final Selector selector;

    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    /**
     * Alternatively, use explicit SPI provider:
     * SelectorProvider p = SelectorProvider.provider();
     * selector = p.openSelector();
     * serverSocket = p.openServerSocketChannel();
     */

    @Override
    public void run() {// normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                for (SelectionKey selectionKey : selector.selectedKeys()) {
                    dispatch(selectionKey);
                }
                selector.selectedKeys().clear();
            }
        } catch (IOException ex) {
            /*...*/
        }

    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }

    class Acceptor implements Runnable {

        @Override
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null) {
                    new Handler(selector, c);
                }
            } catch (IOException ex) {

            }
        }
    }
}

final class Handler implements Runnable {

    final static Executor executor = Executors.newFixedThreadPool(NettyRuntime.availableProcessors() << 1);


    final SocketChannel socket;
    final SelectionKey sk;

    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);

    static final int READING = 0, SENDING = 1, PROCESSING = 3, MAXIN = 1024, MAXOUT = 2048;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        sk = socket.register(selector, SelectionKey.OP_READ);
        sk.attach(this);// 重要 reattach
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    private boolean inputIsComplete() {
        // TODO
        return true;
    }

    private boolean outputIsComplete() {
        // TODO
        return true;
    }

    private void process() {
        System.out.println("processing...");
    }

    @Override
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    private synchronized void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            executor.execute(new Processer());
        }
    }

    private synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }
    }

    private void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }


}



